/*
 * Copyright [2013-2021], Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.polardbx.executor.operator.spill;

import com.alibaba.polardbx.common.exception.TddlRuntimeException;
import com.alibaba.polardbx.common.properties.FileConfig;
import com.alibaba.polardbx.common.utils.logger.Logger;
import com.alibaba.polardbx.common.utils.logger.LoggerFactory;
import com.alibaba.polardbx.executor.mpp.execution.buffer.PagesSerde;
import com.alibaba.polardbx.executor.mpp.execution.buffer.PagesSerdeFactory;
import com.alibaba.polardbx.optimizer.config.table.ColumnMeta;
import com.alibaba.polardbx.optimizer.core.datatype.DataType;
import com.alibaba.polardbx.optimizer.spill.LocalSpillMonitor;
import com.alibaba.polardbx.optimizer.spill.SpillMonitor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.SetThreadName;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.calcite.sql.OutFileParams;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_DATA_OUTPUT;
import static com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_OUT_OF_SPILL_FD;
import static com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_OUT_OF_SPILL_SPACE;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.file.Files.getFileStore;
import static java.nio.file.Files.newDirectoryStream;
import static java.util.Objects.requireNonNull;

public class AsyncFileSingleStreamSpillerFactory implements SingleStreamSpillerFactory {

    private static final Logger log = LoggerFactory.getLogger(AsyncFileSingleStreamSpillerFactory.class);

    private final PagesSerdeFactory serdeFactory;
    private final FileCleaner fileCleaner;
    private final long startTime = System.nanoTime();
    private final int maxThreadNum;

    private AtomicBoolean closed = new AtomicBoolean(false);
    // root path of spillers
    private List<Path> spillerRootPaths;
    // subdir in root path, temporary reserve spiller files generated by operator/shuffle
    // which cleaned on process restart or query is done
    private IntArrayList[] threadsAssign;
    private AsyncFileSingleStreamSpillerFactory.ReaderThread[] readerThreads;
    private AsyncFileSingleStreamSpillerFactory.WriterThread[] writerThreads;

    private AtomicLong fileIdCreator = new AtomicLong(0);

    private final Object lock;
    private long fileFdUsed;
    private Runnable noThrowableOnClose;
    private double maxUsedSpaceThreshold;

    private int theMaxFdLimit;

    public AsyncFileSingleStreamSpillerFactory(FileCleaner fileCleaner) {
        this(
            fileCleaner,
            ImmutableList.of(FileConfig.getInstance().getSpillerTempPath()),
            FileConfig.getInstance().getSpillConfig().getMaxSpillThreads());
    }

    public AsyncFileSingleStreamSpillerFactory(
        FileCleaner fileCleaner,
        List<Path> theSpillerRootPaths,
        int maxThreadNum) {
        this(fileCleaner, theSpillerRootPaths, maxThreadNum,
            FileConfig.getInstance().getSpillConfig().getAvaliableSpillSpaceThreshold());
    }

    public AsyncFileSingleStreamSpillerFactory(
        FileCleaner fileCleaner,
        List<Path> theSpillerRootPaths,
        int maxThreadNum,
        double maxUsedSpaceThreshold) {
        this(fileCleaner, theSpillerRootPaths, maxThreadNum, maxUsedSpaceThreshold,
            FileConfig.getInstance().getSpillConfig().getMaxSpillFdThreshold());
    }

    public AsyncFileSingleStreamSpillerFactory(
        FileCleaner fileCleaner,
        List<Path> theSpillerRootPaths,
        int maxThreadNum,
        double maxUsedSpaceThreshold,
        int theMaxFdLimit) {

        log.info("load AsyncFileSingleStreamSpillerFactory as SingleStreamSpillerFactory");
        requireNonNull(theSpillerRootPaths, "paths is null");
        this.serdeFactory = new PagesSerdeFactory(false);
        this.fileCleaner = requireNonNull(fileCleaner, "fileCleaner is null");
        this.maxThreadNum = maxThreadNum;

        requireNonNull(theSpillerRootPaths, "spillPaths is null");
        Preconditions.checkArgument(theSpillerRootPaths.size() > 0, "spillPaths is empty");

        this.spillerRootPaths = theSpillerRootPaths;

        for (int i = 0; i < spillerRootPaths.size(); i++) {
            log.info("init SpillerManager with path[" + i + "]:" + spillerRootPaths.get(i).toFile().getAbsolutePath());
        }

        // ensure dirs
        for (int i = 0; i < spillerRootPaths.size(); i++) {
            Path rootPath = spillerRootPaths.get(i);
            rootPath.toFile().mkdirs();
        }

        checkArgument(maxThreadNum > 0, "threadNum is not positive");
        // bind thread to path
        this.threadsAssign = new IntArrayList[spillerRootPaths.size()];
        for (int i = 0; i < threadsAssign.length; i++) {
            threadsAssign[i] = new IntArrayList();
        }

        // round-robin assign thread to path
        for (int tid = 0;
             (maxThreadNum >= spillerRootPaths.size() && tid < maxThreadNum) || (maxThreadNum < spillerRootPaths.size()
                 && tid < spillerRootPaths.size()); tid++) {
            int curTid = tid % maxThreadNum;
            int pIdx = tid % spillerRootPaths.size();
            this.threadsAssign[pIdx].add(curTid);
        }

        // start IO threads
        this.readerThreads = new AsyncFileSingleStreamSpillerFactory.ReaderThread[maxThreadNum];
        for (int i = 0; i < maxThreadNum; i++) {
            this.readerThreads[i] = new AsyncFileSingleStreamSpillerFactory.ReaderThread(i);
            this.readerThreads[i].start();
        }
        this.writerThreads = new AsyncFileSingleStreamSpillerFactory.WriterThread[maxThreadNum];
        for (int i = 0; i < maxThreadNum; i++) {
            this.writerThreads[i] = new AsyncFileSingleStreamSpillerFactory.WriterThread(i);
            this.writerThreads[i].start();
        }

        this.lock = new Object();
        this.fileFdUsed = 0;

        this.noThrowableOnClose = new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    fileFdUsed--;
                }
            }
        };
        this.maxUsedSpaceThreshold = maxUsedSpaceThreshold;
        this.theMaxFdLimit = theMaxFdLimit;
    }

    @VisibleForTesting
    public ReaderThread getReadThread(int idx) {
        return readerThreads[idx];
    }

    @VisibleForTesting
    protected static File getTemPath(File base) {
        return new File(base, "_temporary");
    }

    @Override
    public SingleStreamSpiller create(
        String filePrefix, List<DataType> types, LocalSpillMonitor spillMonitor, OutFileParams params) {
        if (params != null) {
            return new AsyncFileSingleBufferSpiller(this, getNextFileHolder(params),
                (List<ColumnMeta>) params.getColumnMeata(), params, spillMonitor);
        } else {
            return new AsyncFileSingleStreamSpiller(this, getNextFileHolder(filePrefix),
                serdeFactory.createPagesSerde(types), spillMonitor);
        }
    }

    public void close() {
        if (closed.compareAndSet(false, true)) {
            for (AsyncFileSingleStreamSpillerFactory.WriterThread wt : writerThreads) {
                wt.close();
            }
            for (AsyncFileSingleStreamSpillerFactory.ReaderThread rt : readerThreads) {
                rt.close();
            }
        }
    }

    private int getSpillerThreadsNum() {
        int dynamicSpillerThreads = FileConfig.getInstance().getSpillConfig().getMaxSpillThreads();
        if (dynamicSpillerThreads <= 0) {
            return maxThreadNum;
        }
        return dynamicSpillerThreads > maxThreadNum ? maxThreadNum : dynamicSpillerThreads;
    }

    private int pickThreadForFile(IntArrayList threadCandidates, int spillerThreadsNum, long randNum) {
        int availableNum = 0;
        for (int i = 0; i < threadCandidates.size(); i++) {
            if (threadCandidates.getInt(i) < spillerThreadsNum) {
                availableNum++;
            }
        }

        if (availableNum == 0) {
            throw new TddlRuntimeException(ERR_OUT_OF_SPILL_SPACE, "No available thread for spill");
        }

        return threadCandidates.getInt((int) (randNum % availableNum));
    }

    private String getFileName(int pathIdx, int threadId, String prefix, long autoId) {
        // spillprefix-starttime-pathIdx-threadId-userprefix-autoId-suffix
        return String.format("%s-%s-%s-%s-%s-%010d%s", SPILL_FILE_PREFIX, startTime, pathIdx, threadId, prefix, autoId,
            SPILL_FILE_SUFFIX);
    }

    protected FileHolder getNextFileHolder(OutFileParams outFileParams) {
        if (spillerRootPaths.size() > 1) {
            throw new TddlRuntimeException(ERR_DATA_OUTPUT, "OutFile can not have more than one path!");
        }
        if (hasEnoughDiskSpace(spillerRootPaths.get(0))) {
            int threadId = threadsAssign[0].getInt(0);
            Path assignPath = Paths.get(spillerRootPaths.get(0).toFile().getAbsolutePath(),
                outFileParams.getFileName());
            log.info(String.format("assign new file for spiller, %s", assignPath));
            return new FileHolder(assignPath, fileCleaner, threadId);
        }
        throw new TddlRuntimeException(ERR_OUT_OF_SPILL_SPACE, "No free space available for spill");
    }

    protected FileHolder getNextFileHolder(String prefix) {
        long idBase = fileIdCreator.incrementAndGet();
        int spillerThreadsNum = getSpillerThreadsNum();

        if (spillerThreadsNum <= spillerRootPaths.size()) {
            for (int inc = 0; inc < spillerThreadsNum; inc++) {
                long id = idBase + inc;
                int pathIdx = (int) (id % spillerThreadsNum);
                if (hasEnoughDiskSpace(spillerRootPaths.get(pathIdx))) {
                    // use the first thread
                    int threadId = threadsAssign[pathIdx].getInt(0);
                    Path assignPath = Paths.get(spillerRootPaths.get(pathIdx).toFile().getAbsolutePath(),
                        getFileName(pathIdx, threadId, prefix, idBase));
                    log.info(String.format("assign new file for spiller, %s", assignPath));
                    return new FileHolder(assignPath, fileCleaner, threadId);
                }
            }
            throw new TddlRuntimeException(ERR_OUT_OF_SPILL_SPACE, "No free space available for spill");
        }

        for (int inc = 0; inc < spillerRootPaths.size(); inc++) {
            long id = idBase + inc;
            int pathIdx = (int) (id % spillerRootPaths.size());
            if (hasEnoughDiskSpace(spillerRootPaths.get(pathIdx))) {
                int threadId = pickThreadForFile(threadsAssign[pathIdx], spillerThreadsNum, id);
                Path assignPath = Paths.get(spillerRootPaths.get(pathIdx).toFile().getAbsolutePath(),
                    getFileName(pathIdx, threadId, prefix, idBase));
                log.info(String.format("assign new file for spiller, %s", assignPath));
                return new FileHolder(assignPath, fileCleaner, threadId);
            }
        }
        throw new TddlRuntimeException(ERR_OUT_OF_SPILL_SPACE, "No free space available for spill");
    }

    protected boolean checkAndAquireFd() {
        synchronized (lock) {
            if (theMaxFdLimit > 0
                && this.fileFdUsed >= theMaxFdLimit) {
                throw new TddlRuntimeException(ERR_OUT_OF_SPILL_FD,
                    String.format("No available fd resource, limited:%s used:%s", theMaxFdLimit, this.fileFdUsed));
            }
            this.fileFdUsed++;
        }
        return true;
    }

    public AsyncPageFileChannelWriter createAsyncPageFileChannelWriter(
        FileHolder fileId, PagesSerde pagesSerde, SpillMonitor spillMonitor)
        throws IOException {
        checkAndAquireFd();
        return new AsyncPageFileChannelWriter(fileId, pagesSerde, writerThreads[fileId.getThreadId()].queue,
            noThrowableOnClose, spillMonitor);
    }

    public AsyncFileBufferWriter createAsyncPageFileChannelWriter(
        FileHolder fileId, List<ColumnMeta> columns,
        OutFileParams outFileParams, SpillMonitor spillMonitor)
        throws IOException {
        checkAndAquireFd();
        return new AsyncFileBufferWriter(fileId, columns, outFileParams, writerThreads[fileId.getThreadId()].queue,
            noThrowableOnClose, spillMonitor);
    }

    public AsyncPageFileChannelReader createAsyncPageFileChannelReader(
        FileHolder fileId, PagesSerde pagesSerde, SpillMonitor spillMonitor) throws IOException {
        checkAndAquireFd();
        return new AsyncPageFileChannelReader(fileId, pagesSerde, readerThreads[fileId.getThreadId()].queue,
            noThrowableOnClose, spillMonitor);
    }

    private boolean hasEnoughDiskSpace(Path path) {
        try {
            FileStore fileStore = getFileStore(path);
            long usableSpace = fileStore.getUsableSpace();
            long totalSpace = fileStore.getTotalSpace();
            boolean hasEnoughDisk =
                usableSpace > totalSpace * (1.0 - maxUsedSpaceThreshold);
            if (!hasEnoughDisk) {
                log.warn(String.format("disk path:%s not has enough disk usable:%s total:%s maxUseSpaceThreshold:%s",
                    path.toString(), usableSpace, totalSpace, maxUsedSpaceThreshold));
            }
            return hasEnoughDisk;
        } catch (IOException e) {
            throw new TddlRuntimeException(ERR_OUT_OF_SPILL_SPACE, e, "Cannot determine free space for spill");
        }
    }

    public SingleStreamSpillerFactory cleanupOldSpillFiles() {
        for (Path path : spillerRootPaths) {
            log.warn("Deleting old spill file in path: " + path);
            // delete temp spill file
            deleteFile(path);
        }
        return this;
    }

    private void deleteFile(Path path) {
        try (DirectoryStream<Path> stream = newDirectoryStream(path)) {
            stream.forEach(spillFile -> {
                try {
                    log.info("Deleting old spill file: " + spillFile);
                    this.fileCleaner.recycleFile(new FileHolder(spillFile, this.fileCleaner));
                } catch (Exception e) {
                    log.warn("Could not cleanup old spill file: " + spillFile);
                }
            });
        } catch (IOException e) {
            log.warn("Error cleaning spill files", e);
        }
    }

    protected static class ReaderThread
        extends Thread {
        private int threadId;
        private final ReadQueue queue = new ReadQueue();
        private volatile boolean closed;
        protected AtomicInteger readCount = new AtomicInteger();
        protected AtomicLong readBytes = new AtomicLong();

        public ReaderThread(int threadId) {
            this.threadId = threadId;
        }

        @Override
        public void run() {
            try (SetThreadName ignored = new SetThreadName("Spill-Reader-%d", threadId)) {
                IORequest.ReadIORequest request = null;
                while (!closed) {
                    request = null;
                    // 1. task an IO request
                    try {
                        request = queue.take();
                    } catch (InterruptedException e) {
                        //log.warn(e, "SpillReaderThread was interrupted");
                    }
                    if (request == null) {
                        continue;
                    }

                    // 2. do IO operation
                    IOException iex = null;
                    try {
                        readBytes.addAndGet(request.read());
                        //nextRound = request.nextRound();
                    } catch (IOException e) {
                        log.error("SpillReaderThread got IOException when do IO operation", e);
                        iex = e;
                    } catch (Throwable e) {
                        iex = new IOException(e);
                        log.error("SpillReaderThread got unkown error when do IO operation", e);
                    }

                    this.readCount.incrementAndGet();

                    // 3. check whether need continue
                    try {
                        request.requestDone(iex);
                    } catch (Throwable e) {
                        log.error("SpillReaderThread got unkown error when doing request finish", e);
                    }
                }
                log.info("SpillReaderThread was closed, exit!");
            }
        }

        public void close() {
            this.closed = true;
            interrupt();
        }

        @VisibleForTesting
        public ReadQueue getQueue() {
            return queue;
        }
    }

    private static class WriterThread
        extends Thread {
        private int threadId;
        private final WriteQueue queue = new WriteQueue();
        private volatile boolean closed;
        protected AtomicInteger writeCount = new AtomicInteger();
        protected AtomicLong writeBytes = new AtomicLong();

        public WriterThread(int threadId) {
            this.threadId = threadId;
        }

        public void close() {
            this.closed = true;
            interrupt();
        }

        @Override
        public void run() {
            int round = 0;
            try (SetThreadName ignored = new SetThreadName("Spill-Writer-%d", threadId)) {
                IORequest.WriteIORequest request = null;
                while (!closed) {
                    request = null;
                    // 1. task an IO request
                    try {
                        request = queue.take();
                    } catch (InterruptedException e) {
                        //log.warn(e, "SpillWriterThread was interrupted");
                    }
                    if (request == null) {
                        continue;
                    }
                    round++;
                    this.writeCount.incrementAndGet();

                    // 2. do IO operation
                    IOException iex = null;
                    try {
                        writeBytes.addAndGet(request.write());
                    } catch (IOException e) {
                        log.error("SpillWriterThread got IOException when do IO operation", e);
                        iex = e;
                    } catch (Throwable e) {
                        iex = new IOException(e);
                        log.error("SpillWriterThread got unkown error when do IO operation", e);
                    }

                    // 3. check whether need continue
                    try {
                        queue.requestDone(request, iex);
                    } catch (Throwable e) {
                        log.error("SpillWriterThread got unkown error when doing request finish", e);
                    }
                }
                log.info("SpillWriterThread was closed, exit!");
            }
        }
    }
}
